<?php

namespace simpleHandle\Component\UtilMQ;

use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire;
use Throwable;
use Exception;
use simpleHandle\Exception\UtilException;
use simpleHandle\Component\UtilTool\Tool;

class RabbitMQ
{
	public string               $messageId;
	public AMQPStreamConnection $MQConn;       // RabbitMQ连接管道
	public AMQPChannel          $MQChan;       // RabbitMQ操作通道
	public string               $exchangeName;
	public string               $queuePrefix;
	private array               $config = [
		"host"         => "127.0.0.1",
		"port"         => 5672,
		"username"     => "guest",
		"password"     => "guest",
		"vhost"        => "/",
		"exchangeName" => "exchangeName",
		"queuePrefix"  => "queuePrefix"
	];

	/**
	 * MQTool constructor.
	 *
	 * @param array $config
	 * @throws Exception
	 */
	public function __construct( array $config = [] ) {
		try {
			$this->config       = array_replace_recursive( $this->config, $config );
			$this->MQConn       = new AMQPStreamConnection(
				$config['host'],
				$config['port'],
				$config['username'],
				$config['password'],
				$config['vhost']
			);
			$this->exchangeName = $config["exchangeName"] ?? "";
			$this->queuePrefix  = $config["queuePrefix"] ?? "";
			if ( $this->MQConn->isConnected() ) {
				$this->MQChan = $this->MQConn->channel();
			}
		} catch ( Throwable $th ) {
			throw new UtilException( $th->getMessage(), UtilException::EasyMQ_ERROR_CODE );
		}
	}

	/**
	 * exchange定义
	 *
	 * @param string $exchangeName 交换机名称
	 * @param string $type         类型
	 * @param bool   $durable      是否持久化
	 * @param bool   $autoDelete   是否自动删除
	 * @param array  $arguments    参数
	 * @throws UtilException
	 */
	public function exchangeDeclare( string $exchangeName, string $type, bool $durable, bool $autoDelete, array $arguments = array() ) {
		try {
			$this->MQChan->exchange_declare(
				$exchangeName,
				$type,
				false,
				$durable,
				$autoDelete,
				false,
				false,
				$arguments
			);
			$this->exchangeName = $exchangeName;
		} catch ( Throwable $th ) {
			throw new UtilException( $th->getMessage(), UtilException::EasyMQ_ERROR_CODE );
		}
	}

	/**
	 * 队列定义
	 *
	 * @param string $queueName  队列名称
	 * @param bool   $durable    是否持久化
	 * @param bool   $autoDelete 是否自动删除
	 * @param array  $arguments  参数
	 * @throws UtilException
	 */
	public function queueDeclare( string $queueName, bool $durable, bool $autoDelete, $arguments = array() ) {
		try {
			$this->MQChan->queue_declare(
				$queueName,
				false,
				$durable,
				false,
				$autoDelete,
				false,
				$this->AMQPTable( $arguments )
			);
			$this->queuePrefix = $queueName;
		} catch ( Throwable $th ) {
			throw new UtilException( $th->getMessage(), UtilException::EasyMQ_ERROR_CODE );
		}
	}

	/**
	 * 队列绑定
	 *
	 * @param string $queueName    队列名称
	 * @param string $exchangeName 交换机名称
	 * @param string $routingKey   路由key
	 * @param array  $arguments    参数
	 * @throws UtilException
	 */
	public function queueBind( string $queueName, string $exchangeName, string $routingKey, array $arguments = array() ) {
		try {
			$this->MQChan->queue_bind(
				$queueName,
				$exchangeName,
				$routingKey,
				false,
				$arguments
			);
		} catch ( Throwable $th ) {
			throw new UtilException( $th->getMessage(), UtilException::EasyMQ_ERROR_CODE );
		}
	}

	/**
	 * 向exchange发送消息
	 *
	 * @param AMQPMessage $msg          信息
	 * @param string      $exchangeName 交换机名称
	 * @param string      $routingKey   路由key
	 * @throws UtilException
	 */
	public function publish( AMQPMessage $msg, string $exchangeName, string $routingKey ) {
		try {
			$this->MQChan->basic_publish(
				$msg,
				$exchangeName,
				$routingKey
			);
		} catch ( Throwable $th ) {
			throw new UtilException( $th->getMessage(), UtilException::EasyMQ_ERROR_CODE );
		}
	}

	/**
	 * 消费
	 * 是同步堵塞的，注意断线问题
	 *
	 * @param string   $queueName   队列名称
	 * @param string   $tag         tag
	 * @param bool     $noAck       是否noAck
	 * @param callable $callback    回调函数
	 * @param array    $arguments   参数
	 * @param int|null $prefetchSize
	 * @param int      $prefetchCount
	 * @param bool     $nonBlocking 是否堵塞
	 * @param int      $timeOut     wait等待时间
	 * @throws UtilException
	 */
	public function consumer(
		string   $queueName,
		string   $tag,
		bool     $noAck,
		callable $callback,
		array    $arguments = array(),
		int      $prefetchSize = null,
		int      $prefetchCount = 1,
		bool     $nonBlocking = false,
		int      $timeOut = 0
	) {
		try {
			$this->MQChan->basic_qos( $prefetchSize, $prefetchCount, null );
			$this->MQChan->basic_consume(
				$queueName,
				$tag,
				false,
				$noAck,
				false,
				false,
				$callback,
				null,
				$arguments
			);
			while ( count( $this->MQChan->callbacks ) ) {
				$this->MQChan->wait( null, $nonBlocking, $timeOut );
			}
		} catch ( Throwable $th ) {
			throw new UtilException( $th->getMessage(), UtilException::EasyMQ_ERROR_CODE );
		}
	}

	/**
	 * 生产者
	 *
	 * @param string $message
	 * @param string $chanName
	 * @param array  $args
	 * @return bool
	 * @throws UtilException
	 */
	public function send( string $message, string $chanName, array $args = [] ): bool {
		try {
			$this->messageId = Tool::makeSnowFlake();
			if ( empty( $message ) || empty( $chanName ) ) {
				throw new Exception( "MQ消息或通道名为空" );
			}
			$isSendOk = false;
			//confirm ack callback function
			$this->MQChan->set_ack_handler( function () use ( &$isSendOk ) {
				$isSendOk = true;
			} );
			//confirm nack callback function
			$this->MQChan->set_nack_handler( function () use ( &$isSendOk ) {
				$isSendOk = false;
			} );
			if ( empty( $this->queuePrefix ) ) {
				throw new Exception( "队列名不存在" );
			}
			if ( empty( $this->exchangeName ) ) {
				throw new Exception( "交换机不存在" );
			}
			$queueName  = "{$this->queuePrefix}{$chanName}";
			$routingKey = "{$this->exchangeName}.{$queueName}";
			$this->ExchangeDeclare( $this->exchangeName, "topic", true, false );
			$this->QueueDeclare( $queueName, true, false, $args );
			$this->QueueBind( $queueName, $this->exchangeName, $routingKey );
			$message = $this->AMQPMessage( $message, $this->messageId, $chanName );
			// 设置confirm消息确认模式并发送消息
			$this->MQChan->confirm_select();
			$this->publish( $message, $this->exchangeName, $routingKey );
			// 等待mq服务给出确认收到消息的回复
			$this->MQChan->wait_for_pending_acks();

			return $isSendOk;
		} catch ( Throwable $th ) {
			throw new UtilException( $th->getMessage(), UtilException::EasyMQ_ERROR_CODE );
		}
	}

	/**
	 * 监听
	 *
	 * @param array    $params
	 * @param callable $callback
	 * @throws UtilException
	 */
	public function listen( array $params, callable $callback ) {
		try {
			$chanName = $params["chanName"] ?? "";
			$args     = $params["args"] ?? array();
			if ( empty( $chanName ) ) {
				throw new Exception( "通道名为空" );
			}
			$tag        = "{$chanName}";
			$queueName  = "{$this->queuePrefix}{$chanName}";
			$routingKey = "{$this->exchangeName}.{$queueName}";
			$this->ExchangeDeclare( $this->exchangeName, "topic", true, false );
			$this->QueueDeclare( $queueName, true, false, $args );
			$this->QueueBind( $queueName, $this->exchangeName, $routingKey );
			$this->consumer( $queueName, $tag, false, $callback );
		} catch ( Throwable $th ) {
			throw new UtilException( $th->getMessage(), UtilException::EasyMQ_ERROR_CODE );
		}
	}

	/**
	 * @param string $message   消息内容
	 * @param string $messageId 消息ID
	 * @param string $chanName
	 * @return AMQPMessage
	 */
	public function AMQPMessage( string $message, string $messageId, string $chanName ): AMQPMessage {
		return new AMQPMessage( $message, array(
			"message_id" => $messageId
		) );
	}

	/**
	 * @param array $header
	 * @return Wire\AMQPTable
	 */
	private function AMQPTable( array $header ): Wire\AMQPTable {
		return new Wire\AMQPTable( $header );
	}

	public function close() {
		$this->MQChan->Close();
		$this->MQConn->Close();
	}
}